Web Streams API
データを逐次処理するWeb API
ストリーム API の概念 - Web API | MDN
ストリームAPIを使いこなす
/mrsekut-p/Web Streams API
async-libのthrottleをTransformStreamで実装したいtakker.icon
chunkをとばせばいい?
Queuing strategy
https://developer.mozilla.org/ja/docs/Web/API/Streams_API/Concepts#内部キューとキューイング戦略
streamの流速を調整する設定
QueuingStrategyを渡す
size: R1つを何個として数えるかを指定する
highWaterMark:
ReadableStreamController.desiredSizeと内部キューのサイズの合計
desiredは負値やnullになる可能性がある
https://developer.mozilla.org/ja/docs/Web/API/ReadableStreamDefaultController/desiredSize
組み込みのQueuingStrategy
CountQueuingStrategyhttps://developer.mozilla.org/ja/docs/Web/API/CountQueuingStrategy
1 byteを1個とカウントする
ByteLengthQueuingStrategy https://developer.mozilla.org/ja/docs/Web/API/ByteLengthQueuingStrategy
R一つを1個とカウントする
R = Uint8Arrayのとき、CountQueuingStrategyはUint8Array objectを1個として数えるが、ByteLengthQueuingStrategyは実際のバイト数で数える
defaultだとhighWaterMark=1のCountQueuingStrategy<T>が渡されるhttps://developer.mozilla.org/ja/docs/Web/API/ReadableStream/ReadableStream#sect1
https://speakerdeck.com/tasshi/tskaigi-2025-web-streams-api?slide=24
queueの図解わかりやすい
backpressureを使う
desiredSizeで検知
ReadableStream
(ReadableStreamController.desiredSize ?? -1) <= 0のとき内部queueがいっぱいだと判定する
内部queueが空になり、下流からデータ要求が発生するとUnderlyingSourcePullCallback<R>が呼ばれる
このときbackpressureが解除される
UnderlyingByteSource.autoAllocateChunkSizeが> 0のときの挙動
ReadableStreamBYOBRequestで
viewが存在する場面
https://triple-underscore.github.io/Streams-ja.html#readable-byte-stream-controller-get-byob-request
Denoで試したところ、start()だとReadableByteStreamController.byobRequestが存在しない
code:queueが空になってからpushを再開する.ts
/// <reference lib="dom.asynciterable" />
import { delay } from "jsr:@std/async@^1.0.11/delay";
const highWaterMark = 128;
const readable = new ReadableStream({
type: "bytes",
async start(controller) {
while (true) {
const queuedSize = highWaterMark - (controller.desiredSize ?? 0);
if (controller.byobRequest) {
const chunk = controller.byobRequest.view!;
const buffer = new Uint8Array(
chunk.buffer,
chunk.byteOffset,
chunk.byteLength,
);
for (let i = 0; i < chunk.byteLength; i++) {
bufferi = i + queuedSize;
}
controller.byobRequest.respond(chunk.byteLength);
} else {
const chunkSize = this.autoAllocateChunkSize!;
if ((controller.desiredSize ?? 0) < chunkSize) return;
console.log("Desired size", controller.desiredSize);
const chunk = new Uint8Array(chunkSize);
for (let i = 0; i < chunk.length; i++) {
chunki = i + queuedSize;
}
controller.enqueue(chunk);
}
await delay(0);
}
},
pull(controller) {
// この行を消すと、queueに少しでも空きができればpushを再開する
if ((controller.desiredSize ?? 0) < highWaterMark) return;
return this.start!(controller);
},
autoAllocateChunkSize: 16,
}, { highWaterMark });
for await (const chunk of readable) {
console.log(chunk);
await delay(1000);
}
TransformStream
(TransformStreamController.desiredSize ?? -1) <= 0のとき内部queueがいっぱいだと判定する
WritableStream
UnderlyingSinkWriteCallback<W>でPromiseを返すhttps://developer.mozilla.org/ja/docs/Web/API/WritableStream/WritableStream#背圧
書き込みが完了するまでに受診されたchunksは、ReadableStreamの内部queueに貯まる
chunksの量がhighWaterMarkに到達すると、(backpressureを考慮したReadableStream/TransformStreamだった場合)上流へ背圧がかかる
ReadableStream.tee()
消費速度の速いほうを基準に背圧がかかる
https://developer.mozilla.org/docs/Web/API/ReadableStream/tee
https://speakerdeck.com/tasshi/tskaigi-2025-web-streams-api?slide=43
仕様書の作成例
https://streams.spec.whatwg.org/#creating-examples
日本語訳
#2025-09-07 11:56:57
#2025-09-06 01:09:38
#2025-05-25 16:45:04
#2024-08-23 16:34:53
#2024-07-08 12:33:11